---
title: DASE Components Explained (Complementary Purchase)
---

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<%= partial 'shared/dase/dase', locals: { template_name: 'Complementary Purchase Engine Template' } %>

## The Engine Design

As you can see from the Quick Start, *MyComplementaryPurchase* takes a JSON prediction
query, e.g. `{ "items" : ["s2i1"], "num" : 3 }`, and return a JSON predicted result.
In MyComplementaryPurchase/src/main/scala/***Engine.scala***, the `Query` case class
defines the format of such **query**:

```scala
case class Query(items: Set[String], num: Int)
  extends Serializable
```

The `PredictedResult` case class defines the format of **predicted result**,
such as

```json
{
  "rules":[
    {
      "cond":["s2i1"],
      "itemScores":[
        {
          "item":"s2i2",
          "support":0.2,
          "confidence":0.9090909090909091,
          "lift":3.787878787878788
        },
        {
          "item":"s2i3",
          "support":0.14,
          "confidence":0.6363636363636364,
          "lift":3.535353535353535
        }
      ]
    }
  ]
}
```

with:

```scala
case class PredictedResult(rules: Array[Rule])
  extends Serializable

case class Rule(cond: Set[String], itemScores: Array[ItemScore])
  extends Serializable

case class ItemScore(
  item: String, support: Double, confidence: Double, lift: Double
) extends Serializable
```

Finally, `ComplementaryPurchaseEngine` is the *Engine Factory* that defines the
components this engine will use: Data Source, Data Preparator, Algorithm(s) and
Serving components.

```scala
object ComplementaryPurchaseEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("algo" -> classOf[Algorithm]),
      classOf[Serving])
  }
}
```

Each DASE component of the `ComplementaryPurchaseEngine` will be explained below.

## Data

In the DASE architecture, data is prepared by 2 components sequentially: *DataSource* and *DataPreparator*. They take data
from the data store and prepare them for Algorithm.

### Data Source

In MyComplementaryPurchase/src/main/scala/***DataSource.scala***, the `readTraining`
method of class `DataSource` reads and selects data from the *Event Store*
(data store of the *Event Server*). It returns `TrainingData`.

```scala
case class DataSourceParams(appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    // get all "user" "buy" "item" events
    val buyEvents: RDD[BuyEvent] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("buy")),
      targetEntityType = Some(Some("item")))(sc)
      .map { event =>
        try {
          new BuyEvent(
            user = event.entityId,
            item = event.targetEntityId.get,
            t = event.eventTime.getMillis
          )
        } catch {
          case e: Exception => {
            logger.error(s"Cannot convert ${event} to BuyEvent. ${e}")
            throw e
          }
        }
      }.cache()

    new TrainingData(buyEvents)
  }
}
```

PredictionIO automatically loads the parameters of *datasource* specified in MyComplementaryPurchase/***engine.json***, including *appName*, to `dsp`.

In ***engine.json***:

```
{
  ...
  "datasource": {
    "params" : {
      "appName": "MyApp1"
    }
  },
  ...
}
```

In `readTraining()`, `PEventStore` is an object which provides function to access data that is collected by PredictionIO Event Server.

This Complementary Purchase Engine Template requires "buy" events.

`PEventStore.find(...)` specifies the events that you want to read. In this case, "user buy item" events are read and then each is mapped to a `BuyEvent` object.

`BuyEvent` case class is defined as:

```scala
case class BuyEvent(user: String, item: String, t: Long)
```

`TrainingData` contains an RDD of `BuyEvent` objects. The class definition of `TrainingData` is:

```scala
class TrainingData(
  val buyEvents: RDD[BuyEvent]
) extends Serializable { ... }
```

PredictionIO then passes the returned `TrainingData` object to *Data Preparator*.

NOTE: You could modify the DataSource to read other event other than the default **buy**.

### Data Preparator

In MyComplementaryPurchase/src/main/scala/***Preparator.scala***, the `prepare` method
of class `Preparator` takes `TrainingData` as its input and performs any
necessary feature selection and data processing tasks. At the end, it returns
`PreparedData` which should contain the data *Algorithm* needs.

By default, `prepare` simply copies the unprocessed `TrainingData` data to `PreparedData`:

```scala
class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  @transient lazy val logger = Logger[this.type]

  def prepare(sc: SparkContext, td: TrainingData): PreparedData = {
    new PreparedData(buyEvents = td.buyEvents)
  }
}

class PreparedData(
  val buyEvents: RDD[BuyEvent]
) extends Serializable
```

PredictionIO passes the returned `PreparedData` object to Algorithm's `train` function.

## Algorithm

In MyComplementaryPurchase/src/main/scala/***ALSAlgorithm.scala***, the two methods of
the algorithm class are `train` and `predict`. `train` is responsible for
training the predictive model; `predict` is
responsible for using this model to make prediction.


The default algorithm is based on concept of [Association Rule Learning] (http://en.wikipedia.org/wiki/Association_rule_learning) to find interesting association rules (A implies B) that indicates additional item (B) may be bought together given a list of items (A). A is the *condition* and B is the *consequence*.

### Algorithm parameters

The Algorithm takes the following parameters, as defined by the `AlgorithmParams` case class:

```scala
case class AlgorithmParams(
  basketWindow: Int, // in seconds
  maxRuleLength: Int,
  minSupport: Double,
  minConfidence: Double,
  minLift: Double,
  minBasketSize: Int,
  maxNumRulesPerCond: Int // max number of rules per condition
  ) extends Params

```

Parameter description:

- **basketWindow**: The buy event is considered as the same basket as previous one if the time difference is within this window (in unit of seconds). For example, if it's set to 120, it means that if the user buys item B within 2 minutes of previous purchase (item A), then the item set [A, B] is considered as the same basket. The purchase of this *basket* is referred as one *transaction*.
- **maxRuleLength**: The maximum length of the association rule length. Must be at least 2. For example, rule of "A implies B" has length of 2 while rule "A, B implies C" has a length of 3. Increasing this number will increase the training time significantly because more combinations are considered.
- **minSupport**: The minimum required *support* for the item set to be considered as rule (valid range is 0 to 1). It's the percentage of the item set appearing among all transactions. This is used to filter out infrequent item set. For example, setting to 0.1 means that the item set must appear in 10 % of all transactions.
- **minConfidence**: The minimum *confidence* required for the rules (valid range is 0 to 1). The confidence indicates the probability of the condition and conseuquence appear in the same transaction. For example, if A appears in 30 transactions and the item set [A, B] appears in 20 transactions, then the rule "A implies B" has confidence of 0.66.
- **minLift**: The minimum *lift* required for the rule. It should be set to 1 to find high quality rule. It's the confidence of the rule divided by the support of the consequence. It is used to filter out rules that the consequence is very frequent anyway regardless of the condition.
- **minBasketSize**: The minimum number of items in basket to be considered by algorithm. This value must be at least 2.
- **maxNumRulesPerCond**: Maximum number of rules generated per condition and stored in the model. By default, the top rules are sorted by *lift* score.

INFO: If you import your own data and the engine doesn't return any results, it could be caused by the following reasons: (1) the algorithm parameter constraint is too high and the algo couldn't find rules that satisfy the condition. you could try setting the following param to 0: **minSupport**, **minConfidence**, **minLift** and then see if anything returned (regardless of recommendation quality), and then adjust the parameter accordingly. (2) the complementary purchase engine requires buy event with correct eventTime. If you import data without specifying eventTime, the SDK will use current time because it assumes the event happens in real time (which is not the case if you import as batch offline), resulting in that all buy events are treated as one big transaction while they should be treated as multiple transactions.


The values of these parameters can be specified in *algorithms* of
MyComplementaryPurchase/***engine.json***:

```
{
  ...
  "algorithms": [
    {
      "name": "algo",
      "params": {
        "basketWindow" : 120,
        "maxRuleLength" : 2,
        "minSupport": 0.1,
        "minConfidence": 0.6,
        "minLift" : 1.0,
        "minBasketSize" : 2,
        "maxNumRulesPerCond": 5
      }
    }
  ]
  ...
}
```

PredictionIO will automatically loads these values into the constructor of the `Algorithm` class.

```scala
class Algorithm(val ap: AlgorithmParams)
  extends P2LAlgorithm[PreparedData, Model, Query, PredictedResult] {
    ...
}
```

### train(...)

`train` is called when you run **pio train** to train a predictive model. The algorithm first find all basket transactions, generates and filters the association rules based on the algorithm parameters:

```scala

  def train(sc: SparkContext, pd: PreparedData): Model = {
    val windowMillis = ap.basketWindow * 1000

    ...

    val transactions: RDD[Set[String]] = ...

    val totalTransaction = transactions.count()
    val minSupportCount = ap.minSupport * totalTransaction

    ...

    // generate item sets
    val itemSets: RDD[Set[String]] = transactions
      .flatMap { tran =>
        (1 to ap.maxRuleLength).flatMap(n => tran.subsets(n))
      }

    ...

    val itemSetCount: RDD[(Set[String], Int)] = ...

    ...

    val rules: RDD[(Set[String], RuleScore)] = ...

    val sortedRules = rules.groupByKey
      .mapValues(iter =>
        iter.toVector
          .sortBy(_.lift)(Ordering.Double.reverse)
          .take(ap.maxNumRulesPerCond)
        )
      .collectAsMap.toMap

    new Model(sortedRules)
  }

```

PredictionIO will automatically store the returned model after training, i.e. the `Model` object.

The `Model` stores the top rules for each condition:

```scala
class Model(
  val rules: Map[Set[String], Vector[RuleScore]]
) extends Serializable {
  ...
}
```

### predict(...)

`predict` is called when you send a JSON query to
http://localhost:8000/queries.json. PredictionIO converts the query, such as `{ "items" : ["s2i1"], "num" : 3 }` to the `Query` class you defined previously in `Engine.scala`.

The `predict()` function does the following:

1. find all possible subset of the items in query
2. use the subsets as condition to look up the model and return the rules for each condition.

```scala

  ...

  def predict(model: Model, query: Query): PredictedResult = {
    val conds = (1 to maxCondLength).flatMap(n => query.items.subsets(n))

    val rules = conds.map { cond =>
      model.rules.get(cond).map{ vec =>
        val itemScores = vec.take(query.num).map { rs =>
          new ItemScore(
            item = rs.conseq,
            support = rs.support,
            confidence = rs.confidence,
            lift = rs.lift
          )
        }.toArray
        Rule(cond = cond, itemScores = itemScores)
      }
    }.flatten.toArray

    new PredictedResult(rules)
  }

  ...

```

PredictionIO passes the returned `PredictedResult` object to *Serving*.

## Serving

The `serve` method of class `Serving` processes predicted result. It is also
responsible for combining multiple predicted results into one if you have more
than one predictive model. *Serving* then returns the final predicted result.
PredictionIO will convert it to a JSON response automatically.

In MyComplementaryPurchase/src/main/scala/***Serving.scala***,

```scala
class Serving
  extends LServing[Query, PredictedResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}
```

When you send a JSON query to http://localhost:8000/queries.json,
`PredictedResult` from all models will be passed to `serve` as a sequence, i.e.
`Seq[PredictedResult]`.

NOTE: An engine can train multiple models if you specify more than one Algorithm
component in `object ComplementaryPurchaseEngine` inside ***Engine.scala*** and  the corresponding parameters in ***engine.json***. Since only one `Algorithm` is implemented by default, this `Seq` contains one element.
